package com.bytecub.gateway.mq.excutor;

import com.alibaba.fastjson.JSONObject;
import com.bytecub.common.constants.BCConstants;
import com.bytecub.common.domain.dto.request.device.DeviceUpdateReqDto;
import com.bytecub.common.domain.gateway.mq.DeviceUpMessageBo;
import com.bytecub.common.domain.gateway.mq.UpgradeMessageBo;
import com.bytecub.common.domain.message.DeviceReportMessage;
import com.bytecub.common.domain.message.UpgradeReplyMessage;
import com.bytecub.common.enums.UpgradeEnum;
import com.bytecub.gateway.mq.services.IUpMessageParseService;
import com.bytecub.mdm.cache.IMessageCountCache;
import com.bytecub.mdm.service.IDeviceService;
import com.bytecub.mdm.service.ITaskDetailService;
import com.bytecub.protocol.base.IBaseProtocol;
import com.bytecub.utils.JSONProvider;
import java.util.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
 * * ByteCub.cn. * Copyright (c) 2020-2021 All Rights Reserved. * * @author bytecub@163.com songbin
 * * @version Id: DeviceMessageUpExcutor.java, v 0.1 2021-02-25 Exp $$
 */
@Service
@Slf4j
public class DeviceMessageReplyExecutor {
    @Autowired private IMessageCountCache messageCountCache;
    @Autowired private IDeviceService deviceService;
    @Autowired private ITaskDetailService taskDetailService;
    @Autowired private IUpMessageParseService messageParseService;

    /***
     * 具体线程
     */
    @Async(BCConstants.TASK.DEVICE_REPLY_MESSAGE_NAME)
    public void execute(DeviceUpMessageBo msg) {
        try {
            messageCountCache.todayTotalIncr();
            if (msg.getTopic().endsWith(BCConstants.TOPIC.UPGRADE_REPLY)
                    || msg.getTopic().endsWith(BCConstants.TOPIC.SUB_UPGRADE_REPLY)) {
                messageParseService.rebuildMsg(msg);
                IBaseProtocol protocolService =
                        messageParseService.queryProtocolByProductCode(msg.getProductCode());
                /** 来自设备转化为平台需要的数据结构 */
                DeviceReportMessage deviceMessage =
                        protocolService.decode(
                                msg.getTopic(), msg.getDeviceCode(), msg.getSourceMsg());
                this.upgrade(msg, deviceMessage);
            } else {
                messageParseService.processReply(msg);
            }
        } catch (Exception e) {
            log.warn("内部队列消费异常", e);
        }
    }

    /** 处理升级指令上报 分开处理是因为升级指令上报不进es */
    private void upgrade(DeviceUpMessageBo msg, DeviceReportMessage deviceMessage) {
        UpgradeMessageBo messageBo = new UpgradeMessageBo();
        messageBo.setMessageId(deviceMessage.getMessageId());
        String reply = deviceMessage.getReplyMessage();

        Map<String, Object> map = JSONProvider.parseObject(reply, Map.class);
        JSONObject result = (JSONObject) map.get("result");
        UpgradeReplyMessage upgradeReplyMessage =
                JSONProvider.parseJsonObject(result, UpgradeReplyMessage.class);
        upgradeReplyMessage.setMessageId(messageBo.getMessageId());
        UpgradeEnum upgradeEnum = UpgradeEnum.explain(upgradeReplyMessage.getCode());
        String extMsg = UpgradeEnum.FAILED.equals(upgradeEnum) ? upgradeReplyMessage.getMsg() : "";
        String version = taskDetailService.update(messageBo, upgradeEnum, extMsg);
        if (UpgradeEnum.SUCCESS.equals(upgradeEnum)) {
            DeviceUpdateReqDto reqDto = new DeviceUpdateReqDto();
            reqDto.setDeviceCode(messageBo.getDeviceCode());
            reqDto.setFirmwareVersion(version);
            deviceService.updateByCode(reqDto);
        }
    }
}
